package com.hadoop.code;

import java.util.LinkedList;

/**
 *
 *  HDFS：有个别地方源码写得不错的。 Hadoop2.3.0
 *  kafka Zookeeper
 *https://www.cnblogs.com/zhengbin/p/5654805.html
 * 加锁，多线程共享变量需要加锁
 */
public class FSEditLog {
    private long txid=0L;
    private DoubleBuffer editLogBuffer=new DoubleBuffer();
    //当前是否正在往磁盘里面刷写数据
    private volatile Boolean isSyncRunning = false;
    private volatile Boolean isWaitSync = false;

    private volatile Long syncMaxTxid = 0L;
    /**
     * 一个线程 就会有自己一个ThreadLocal的副本
     * 当每个线程中都各自创建一个对象时，这样每个线程都有自己的这个ThreadLocal的副本
     */
    private ThreadLocal<Long> localTxid=new ThreadLocal<Long>();

    public static void main(String[] args) {
        //logEdit("mkdir /data")
    }

    /**
     * 写元数据日志方法
     *
     * @param content
     */
    public void logEdit(String content){//mkdir /data
        /**
         * 因为我们要保证日志的ID要递增，唯一
         *
         * 线程1，线程2， 线程3
         */
        synchronized (this){
            //线程1
            //日志的ID号，元数据信息的ID号。
            txid++; //1
            /**
             * 每个线程都会有自己的一个副本。
             * 对象中有个ThreadLocal，在堆内存中，
             * 每个线程都可以访问到这个Threadlocal,并保存一个副本在自己的线程中，栈内存中
             * 线程1，1
             * 线程2，2
             * 线程3，3
             */
            localTxid.set(txid);


            EditLog log = new EditLog(txid, content);
                //往内存里面写数据
            editLogBuffer.write(log);
        } //释放锁

        /**
         * 内存1：
         * 线程1，1 元数据1
         * 线程2，2 元数据2
         * 线程3，3 元数据3
         */
        logSync();
    }

    private  void logSync(){
        /**
         *
         * 线程1，ID号：1
         *
         * 线程2 进来了
         *
         * 线程3 进来了 return
         *
         * 线程4 进来了
         * 线程5 进来了
         */
        synchronized (this){
            //当前是否正在往磁盘写数据，默认是false
            if(isSyncRunning){
                    //当前的元数据信息的编号就是：2
                //4
                //5
                long txid = localTxid.get();
                // 2 <= 3
                //4 <= 3
                //5 <= 3
                if(txid <= syncMaxTxid){
                    return;
                }
                //
                if(isWaitSync){
                    //直接返回
                    return;
                }
                //重新赋值
                isWaitSync = true;

                while(isSyncRunning){
                    try {
                        //线程4就会在这儿等待
                        //释放锁
                        /**
                         * 1) 时间到了
                         * 2）被唤醒了
                         */
                        wait(2000);
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
                //重新赋值
                isWaitSync = false;
            }

            //交换内存，我是直接交换的内存，肯定是简单粗暴
            //真正的源码里面是有判断。
            //如果来不来就直接交换内存，频繁的交换内存，也是很影响性能的。
            editLogBuffer.setReadyToSync();


            if(editLogBuffer.currentBuffer.size() > 0) {
                //获取当前 内存2（正在往磁盘上面写数据的那个内存）
                //里面元数据日志日志编号最大的是多少
                syncMaxTxid = editLogBuffer.getSyncMaxTxid();

            }

            //说明接下来就要往磁盘上面写元数据日志信息了。
            isSyncRunning = true;
        } //释放锁

        //往磁盘上面写数据（这个操作是很耗费时间的）
        /**
         * 线程一 执行如下代码
         * 在最耗费时间的这段代码上面是没有加锁
         * 几毫秒，几十毫秒
         */
        editLogBuffer.flush(); //这个地方写完了。

        //对于共享变量的操作是要加锁的
        synchronized (this) {
            //状态恢复
            isSyncRunning = false;
            //唤醒当前wait的线程。
            notify();
        }

    }


    /**
     * 使用了面向对象的思想，把一条日志看成一个对象。
     * 日志信息，或者就是我们说的元数据信息。
     */
    class EditLog{ //
        //日志的编号，递增，唯一。
        long txid;
        //日志内容
        String content;//mkdir /data

        //构造函数
        public EditLog(long txid,String content){
            this.txid = txid;
            this.content = content;
        }

        //方便我们打印日志
        @Override
        public String toString() {
            return "EditLog{" +
                    "txid=" + txid +
                    ", content='" + content + '\'' +
                    '}';
        }
    }

    /**
     * 双缓冲算法
     */
    class DoubleBuffer{

               //内存1
        LinkedList<EditLog> currentBuffer = new LinkedList<EditLog>();
        //内存2
        LinkedList<EditLog> syncBuffer= new LinkedList<EditLog>();

        /**
         * 把数据写到当前内存
         * @param log
         */
        public void write(EditLog log){
            currentBuffer.add(log);
        }

        /**
         * 两个内存交换数据
         */
        public void setReadyToSync(){
            LinkedList<EditLog> tmp= currentBuffer;
            currentBuffer = syncBuffer;
            syncBuffer = tmp;
        }

        /**
         * 获取当前正在刷磁盘的内存里的ID最大的值。
         * @return
         */
        public Long getSyncMaxTxid(){
            return syncBuffer.getLast().txid;
        }

        /**
         * 就是把数据涮写到磁盘上面
         * 为了演示效果，所以我们把打印出来
         */
        public void flush(){
            for(EditLog log:syncBuffer){
                System.out.println("存入磁盘日志信息："+log);
            }
            //清空内存
            syncBuffer.clear();
        }
    }
}

//提供远程服务调用功能，对DataNode提供远程服务调用